Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.

This commit is contained in:
Tathagata Das 2013-10-21 05:34:09 -07:00
parent cf64f63f8a
commit 0666498799
11 changed files with 529 additions and 33 deletions

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.java.function;
import scala.reflect.ClassManifest;
import scala.reflect.ClassManifest$;
import scala.runtime.AbstractFunction2;
import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
implements Serializable {
public abstract R call(T1 t1, T2 t2, T3 t3) throws Exception;
public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.java.function
import scala.runtime.AbstractFunction3
/**
* Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
* apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
* isn't marked to allow that).
*/
private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
extends AbstractFunction3[T1, T2, T3, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2, t3: T3): R
final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
}

View file

@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => transformFunc(r))
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
/**
@ -508,7 +508,41 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, context.sparkContext.clean(transformFunc))
//new TransformedDStream(this, context.sparkContext.clean(transformFunc))
val cleanedF = context.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
}
/**
* Return a new DStream in which each RDD is generated by applying a function on RDDs
* of DStreams stream1 and stream2.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
}
/**
* Return a new DStream in which each RDD is generated by applying a function on RDDs
* of DStreams stream1 and stream2.
*/
def transformWith[U: ClassManifest, V: ClassManifest](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
val cleanedF = ssc.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 2)
val rdd1 = rdds(0).asInstanceOf[RDD[T]]
val rdd2 = rdds(1).asInstanceOf[RDD[U]]
cleanedF(rdd1, rdd2, time)
}
new TransformedDStream[V](Seq(this, other), realTransformFunc)
}
/**

View file

@ -444,27 +444,73 @@ extends Serializable {
}
/**
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
/**
* Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
this.cogroup(other, partitioner)
.flatMapValues{
case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
)
}
/**
* Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
leftOuterJoin[W](other, defaultPartitioner())
}
/**
* Return new DStream by applying 'left outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, Option[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
)
}
/**
* Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
*/
def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
rightOuterJoin[W](other, defaultPartitioner())
}
/**
* Return new DStream by applying 'right outer join' between RDDs of `this` DStream and
* `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Option[V], W))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
)
}
/**

View file

@ -455,12 +455,23 @@ class StreamingContext private (
}
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
def transform[T: ClassManifest](
streams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
new TransformedDStream[T](streams, sparkContext.clean(transformFunc))
}
/**
* Register an input stream that will be started (InputDStream.start() called) to get the
* input data.

View file

@ -24,7 +24,8 @@ import scala.collection.JavaConversions._
import org.apache.spark.streaming._
import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
import java.util
import org.apache.spark.rdd.RDD
import JavaDStream._
@ -306,6 +307,82 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.transform(scalaTransform(_, _))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this and other DStreams.
*/
def transformWith[U, W](
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
): JavaDStream[W] = {
implicit val cmu: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cmv: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this and other DStreams.
*/
def transformWith[U, K2, V2](
other: JavaDStream[U],
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
implicit val cmu: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cmk2: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv2: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this and other DStreams.
*/
def transformWith[K, V, W](
other: JavaPairDStream[K, V],
transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaRDD[W]]
): JavaDStream[W] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmw: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[W] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K, V), W](other.dstream, scalaTransform(_, _, _))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this and other DStreams.
*/
def transformWith[K, V, K2, V2](
other: JavaPairDStream[K, V],
transformFunc: JFunction3[R, JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]
): JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmk2: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv2: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
def scalaTransform (inThis: RDD[T], inThat: RDD[(K, V)], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
dstream.transformWith[(K, V), (K2, V2)](other.dstream, scalaTransform(_, _, _))
}
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed

View file

@ -36,7 +36,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
@ -499,8 +499,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream..
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
@ -509,9 +509,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
* Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
@ -520,6 +519,52 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.join(other.dstream, partitioner)
}
/**
* Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.leftOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
/**
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def leftOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, Optional[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
}
/**
* Return new DStream by applying 'left outer join' between RDDs of `this` DStream and `other` DStream..
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.rightOuterJoin(other.dstream)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
/**
* Return new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def rightOuterJoin[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (Optional[V], W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".

View file

@ -19,7 +19,7 @@ package org.apache.spark.streaming.api.java
import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap}
import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
@ -584,6 +584,29 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
}
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {
val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
implicit val cm: ClassManifest[T] = first.classManifest
ssc.union(dstreams)(cm)
}
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[K, V](
first: JavaPairDStream[K, V],
rest: JList[JavaPairDStream[K, V]]
): JavaPairDStream[K, V] = {
val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream)
implicit val cm: ClassManifest[(K, V)] = first.classManifest
implicit val kcm: ClassManifest[K] = first.kManifest
implicit val vcm: ClassManifest[V] = first.vManifest
new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm)
}
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.

View file

@ -21,16 +21,22 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, DStream, Time}
private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {
class TransformedDStream[U: ClassManifest] (
parents: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[U]
) extends DStream[U](parents.head.ssc) {
override def dependencies = List(parent)
require(parents.length > 0, "List of DStreams to transform is empty")
require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
override def slideDuration: Duration = parent.slideDuration
override def dependencies = parents.toList
override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
Some(transformFunc(parentRDDs, validTime))
}
}

View file

@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable {
}
@Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(new Tuple2<String, String>("california", "sharks"),
new Tuple2<String, String>("new york", "rangers")));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "giants"),
new Tuple2<String, String>("new york", "mets")),
Arrays.asList(new Tuple2<String, String>("california", "ducks"),
new Tuple2<String, String>("new york", "islanders")));
List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("yankees", "mets"))),
Arrays.asList(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
new Tuple2<String, String>("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
pairStream2,
new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
@Override
public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
}
}
);
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@Test
public void testLeftOuterJoin() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
new Tuple2<String, String>("new york", "yankees")),
Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "giants") ),
Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
);
List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2);
JavaDStream<Long> counted = joined.count();
JavaTestUtils.attachTestOutputStream(counted);
List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}

View file

@ -18,7 +18,10 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import scala.runtime.RichInt
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
test("union") {
val input = Seq(1 to 4, 101 to 104, 201 to 204)
val output = Seq(1 to 8, 101 to 108, 201 to 208)
testOperation(
input,
(s: DStream[Int]) => s.union(s.map(_ + 4)) ,
output
)
}
test("StreamingContext.union") {
val input = Seq(1 to 4, 101 to 104, 201 to 204)
val output = Seq(1 to 12, 101 to 112, 201 to 212)
// union over 3 DStreams
testOperation(
input,
(s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))),
output
)
}
test("transform") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
input,
(r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform
input.map(_.map(_.toString))
)
}
test("transformWith") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (1, "x")), ("b", (1, "x")) ),
Seq( ("", (1, "x")) ),
Seq( ),
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
val t1 = s1.map(x => (x, 1))
val t2 = s2.map(x => (x, "x"))
t1.transformWith( // RDD.join in transform
t2,
(rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2)
)
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
test("StreamingContext.transform") {
val input = Seq(1 to 4, 101 to 104, 201 to 204)
val output = Seq(1 to 12, 101 to 112, 201 to 212)
// transform over 3 DStreams by doing union of the 3 RDDs
val operation = (s: DStream[Int]) => {
s.context.transform(
Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams
(rdds: Seq[RDD[_]], time: Time) =>
rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs
)
}
testOperation(input, operation, output)
}
test("cogroup") {
val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() )
@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x,1)).join(s2.map(x => (x,"x")))
s1.map(x => (x, 1)).join(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
test("leftOuterJoin") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ),
Seq( ("", (1, Some("x"))), ("a", (1, None)) ),
Seq( ("", (1, None)) ),
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
test("rightOuterJoin") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
val outputData = Seq(
Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ),
Seq( ("", (Some(1), "x")), ("b", (None, "x")) ),
Seq( ),
Seq( ("", (None, "x")) )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}