Docs, some tests, and work on

StreamingContext
This commit is contained in:
Patrick Wendell 2013-01-09 19:27:32 -08:00
parent 7e1049d8f1
commit 560c312c60
5 changed files with 621 additions and 36 deletions

View file

@ -4,15 +4,25 @@ import spark.streaming.{Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import java.util.{List => JList}
import spark.storage.StorageLevel
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] {
/** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
dstream.filter((x => f(x).booleanValue()))
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaDStream[T] = dstream.cache()
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
/** Method that generates a RDD for the given time */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
@ -20,15 +30,38 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowTime width of the window; must be a multiple of this DStream's interval.
* @return
*/
def window(windowTime: Time): JavaDStream[T] =
dstream.window(windowTime)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* @param windowTime duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
* @param slideTime sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
def window(windowTime: Time, slideTime: Time): JavaDStream[T] =
dstream.window(windowTime, slideTime)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchTime tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchTime: Time): JavaDStream[T] =
dstream.tumble(batchTime)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
}

View file

@ -16,41 +16,81 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def dstream: DStream[T]
/**
* Prints the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() = dstream.print()
/**
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): JavaDStream[Int] = dstream.count()
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowTime and slideTime are as defined in the
* window() operation. This is equivalent to window(windowTime, slideTime).count()
*/
def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
dstream.countByWindow(windowTime, slideTime)
}
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
/** Returns the StreamingContext associated with this DStream */
def context(): StreamingContext = dstream.context()
/** Returns a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
}
/** Returns a new DStream by applying a function to all elements of this DStream. */
def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
}
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
: JavaPairDStream[K, V] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
/**
* Returns a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowTime and slideTime are as defined in the
* window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
invReduceFunc: JFunction2[T, T, T],
@ -59,18 +99,33 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
}
/**
* Returns all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
}
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
}
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
/**
* Returns a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
@ -79,6 +134,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
dstream.transform(scalaTransform(_))
}
/**
* Returns a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]

View file

@ -12,18 +12,31 @@ import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.{JavaPairRDD, JavaRDD}
import spark.storage.StorageLevel
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
// Common to all DStream's
// =======================================================================
// Methods common to all DStream's
// =======================================================================
/** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaPairDStream[K, V] = dstream.cache()
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
/** Method that generates a RDD for the given time */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
@ -31,19 +44,45 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowTime width of the window; must be a multiple of this DStream's interval.
* @return
*/
def window(windowTime: Time): JavaPairDStream[K, V] =
dstream.window(windowTime)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* @param windowTime duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
* @param slideTime sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] =
dstream.window(windowTime, slideTime)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchTime tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchTime: Time): JavaPairDStream[K, V] =
dstream.tumble(batchTime)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
// Only for PairDStreams...
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
@ -59,8 +98,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
// TODO: TEST BELOW
def combineByKey[C](createCombiner: Function[V, C],
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairDStream[K, C] = {

View file

@ -4,27 +4,173 @@ import scala.collection.JavaConversions._
import java.util.{List => JList}
import spark.streaming._
import dstream.SparkFlumeEvent
import dstream._
import spark.storage.StorageLevel
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
class JavaStreamingContext(val ssc: StreamingContext) {
def this(master: String, frameworkName: String, batchDuration: Time) =
this(new StreamingContext(master, frameworkName, batchDuration))
def textFileStream(directory: String): JavaDStream[String] = {
ssc.textFileStream(directory)
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
// - Add checkpoint()/remember()
// - Support creating your own streams
// - Add Kafka Stream
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
ssc.networkTextStream(hostname, port, storageLevel)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
ssc.networkTextStream(hostname, port)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def networkStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaDStream[T] = {
import scala.collection.JavaConverters._
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.networkStream(hostname, port, fn, storageLevel)
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
ssc.textFileStream(directory)
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
def rawNetworkStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmf: ClassManifest[F] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
ssc.fileStream[K, V, F](directory);
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port, storageLevel)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
def flumeStream(hostname: String, port: Int):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
// NOT SUPPORTED: registerInputStream
/**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
ssc.registerOutputStream(outputStream.dstream)
}
/**
* Starts the execution of the streams.
*/
def start() = ssc.start()
/**
* Sstops the execution of the streams.
*/
def stop() = ssc.stop()
}

View file

@ -6,6 +6,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaRDD;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
@ -377,18 +378,31 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "yankees"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "rangers"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 1),
new Tuple2<String, Integer>("california", 3),
new Tuple2<String, Integer>("new york", 4),
new Tuple2<String, Integer>("new york", 1)),
Arrays.asList(
new Tuple2<String, Integer>("california", 5),
new Tuple2<String, Integer>("california", 5),
new Tuple2<String, Integer>("new york", 3),
new Tuple2<String, Integer>("new york", 1)));
@Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "yankees"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "rangers"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
Arrays.asList(
@ -410,18 +424,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testPairReduceByKey() {
List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 1),
new Tuple2<String, Integer>("california", 3),
new Tuple2<String, Integer>("new york", 4),
new Tuple2<String, Integer>("new york", 1)),
Arrays.asList(
new Tuple2<String, Integer>("california", 5),
new Tuple2<String, Integer>("california", 5),
new Tuple2<String, Integer>("new york", 3),
new Tuple2<String, Integer>("new york", 1)));
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
@ -435,17 +438,323 @@ public class JavaAPISuite implements Serializable {
sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testCombineByKey() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Integer>("california", 4),
new Tuple2<String, Integer>("new york", 5)),
Arrays.asList(
new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
@Override
public Integer call(Integer i) throws Exception {
return i;
}
}, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
JavaTestUtils.attachTestOutputStream(combined);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testCountByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)),
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
// TODO: Below fails with compile error with <String, Long>... wtf?
JavaPairDStream<String, Object> counted = pairStream.countByKey();
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testGroupByKeyAndWindow() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
Arrays.asList(new Tuple2<String, List<String>>("california",
Arrays.asList("sharks", "ducks", "dodgers", "giants")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> groupWindowed =
pairStream.groupByKeyAndWindow(new Time(2000), new Time(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testReduceByKeyAndWindow() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, Integer>("california", 4),
new Tuple2<String, Integer>("new york", 5)),
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)),
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new Time(2000), new Time(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testReduceByKeyAndWindowWithInverse() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, Integer>("california", 4),
new Tuple2<String, Integer>("new york", 5)),
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)),
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Time(2000), new Time(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testCountByKeyAndWindow() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)),
Arrays.asList(
new Tuple2<String, Long>("california", 4L),
new Tuple2<String, Long>("new york", 4L)),
Arrays.asList(
new Tuple2<String, Long>("california", 2L),
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
// TODO: Below fails with compile error with <String, Long>... wtf?
JavaPairDStream<String, Object> counted = pairStream.countByKeyAndWindow(new Time(2000), new Time(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
new Tuple2<String, String>("california", "GIANTS"),
new Tuple2<String, String>("new york", "YANKEES"),
new Tuple2<String, String>("new york", "METS")),
Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
new Tuple2<String, String>("california", "DUCKS"),
new Tuple2<String, String>("new york", "RANGERS"),
new Tuple2<String, String>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
return s.toUpperCase();
}
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testFlatMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
List<List<Tuple2<String, String>>> expected = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
new Tuple2<String, String>("california", "dodgers2"),
new Tuple2<String, String>("california", "giants1"),
new Tuple2<String, String>("california", "giants2"),
new Tuple2<String, String>("new york", "yankees1"),
new Tuple2<String, String>("new york", "yankees2"),
new Tuple2<String, String>("new york", "mets1"),
new Tuple2<String, String>("new york", "mets2")),
Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
new Tuple2<String, String>("california", "sharks2"),
new Tuple2<String, String>("california", "ducks1"),
new Tuple2<String, String>("california", "ducks2"),
new Tuple2<String, String>("new york", "rangers1"),
new Tuple2<String, String>("new york", "rangers2"),
new Tuple2<String, String>("new york", "islanders1"),
new Tuple2<String, String>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String in) {
List<String> out = new ArrayList<String>();
out.add(in + "1");
out.add(in + "2");
return out;
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testCoGroup() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
Arrays.asList(
new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("yankees", "mets"))),
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertEquals(expected, result);
}
}