diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index a6518abf45..c099ca77b9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -598,4 +598,15 @@ object JavaPairRDD { new JavaPairRDD[K, V](rdd) implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd + + + /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ + def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairRDD[K, V](rdd.rdd) + } + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 3c466ade93..70bc25070a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -474,10 +474,10 @@ class StreamingContext private ( * the DStreams. */ def transform[T: ClassManifest]( - streams: Seq[DStream[_]], + dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) => RDD[T] ): DStream[T] = { - new TransformedDStream[T](streams, sparkContext.clean(transformFunc)) + new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 309c0fa24b..4dd6b7d096 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -711,6 +711,11 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } + /** Convert to a JavaDStream */ + def toJavaDStream(): JavaDStream[(K, V)] = { + new JavaDStream[(K, V)](dstream) + } + override val classManifest: ClassManifest[(K, V)] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index a4b1670cd4..cf30b541e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -33,7 +33,7 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} @@ -616,6 +616,54 @@ class JavaStreamingContext(val ssc: StreamingContext) { new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) } + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[T]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] + ): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[K, V]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] + ): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 2f92421367..f588afe90c 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.io.Files; import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -292,8 +293,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc()); - JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3)); - JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6)); + JavaRDD rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3)); + JavaRDD rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6)); JavaRDD rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9)); LinkedList> rdds = Lists.newLinkedList(); @@ -331,7 +332,6 @@ public class JavaAPISuite implements Serializable { } }); } - } ); JavaTestUtils.attachTestOutputStream(transformed); @@ -354,7 +354,8 @@ public class JavaAPISuite implements Serializable { JavaDStream transformed1 = stream.transform( new Function, JavaRDD>() { - @Override public JavaRDD call(JavaRDD in) throws Exception { + @Override + public JavaRDD call(JavaRDD in) throws Exception { return null; } } @@ -421,51 +422,56 @@ public class JavaAPISuite implements Serializable { @Test public void testTransformWith() { List>> stringStringKVStream1 = Arrays.asList( - Arrays.asList(new Tuple2("california", "dodgers"), - new Tuple2("new york", "yankees")), - Arrays.asList(new Tuple2("california", "sharks"), - new Tuple2("new york", "rangers"))); + Arrays.asList( + new Tuple2("california", "dodgers"), + new Tuple2("new york", "yankees")), + Arrays.asList( + new Tuple2("california", "sharks"), + new Tuple2("new york", "rangers"))); List>> stringStringKVStream2 = Arrays.asList( - Arrays.asList(new Tuple2("california", "giants"), - new Tuple2("new york", "mets")), - Arrays.asList(new Tuple2("california", "ducks"), - new Tuple2("new york", "islanders"))); + Arrays.asList( + new Tuple2("california", "giants"), + new Tuple2("new york", "mets")), + Arrays.asList( + new Tuple2("california", "ducks"), + new Tuple2("new york", "islanders"))); List>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2>("california", - new Tuple2("dodgers", "giants")), - new Tuple2>("new york", - new Tuple2("yankees", "mets"))), - Arrays.asList( - new Tuple2>("california", - new Tuple2("sharks", "ducks")), - new Tuple2>("new york", - new Tuple2("rangers", "islanders")))); + Arrays.asList( + new Tuple2>("california", + new Tuple2("dodgers", "giants")), + new Tuple2>("new york", + new Tuple2("yankees", "mets"))), + Arrays.asList( + new Tuple2>("california", + new Tuple2("sharks", "ducks")), + new Tuple2>("new york", + new Tuple2("rangers", "islanders")))); JavaDStream> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); + ssc, stringStringKVStream1, 1); JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream(stream1); JavaDStream> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); + ssc, stringStringKVStream2, 1); JavaPairDStream pairStream2 = JavaPairDStream.fromJavaDStream(stream2); JavaPairDStream> joined = pairStream1.transformWith( pairStream2, - new Function3 < + new Function3< JavaPairRDD, JavaPairRDD, Time, JavaPairRDD> >() { - @Override public JavaPairRDD> call( + @Override + public JavaPairRDD> call( JavaPairRDD rdd1, JavaPairRDD rdd2, Time time - ) throws Exception { + ) throws Exception { return rdd1.join(rdd2); } } @@ -475,9 +481,9 @@ public class JavaAPISuite implements Serializable { List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); - } + @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -566,7 +572,6 @@ public class JavaAPISuite implements Serializable { } ); - JavaPairDStream pairTransformed4 = pairStream1.transformWith( pairStream2, new Function3, JavaPairRDD, Time, JavaPairRDD>() { @@ -578,7 +583,74 @@ public class JavaAPISuite implements Serializable { ); } - @Test + @Test + public void testStreamingContextTransform(){ + List> stream1input = Arrays.asList( + Arrays.asList(1), + Arrays.asList(2) + ); + + List> stream2input = Arrays.asList( + Arrays.asList(3), + Arrays.asList(4) + ); + + List>> pairStream1input = Arrays.asList( + Arrays.asList(new Tuple2(1, "x")), + Arrays.asList(new Tuple2(2, "y")) + ); + + List>>> expected = Arrays.asList( + Arrays.asList(new Tuple2>(1, new Tuple2(1, "x"))), + Arrays.asList(new Tuple2>(2, new Tuple2(2, "y"))) + ); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); + JavaPairDStream pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); + + List> listOfDStreams1 = Arrays.>asList(stream1, stream2); + + // This is just to test whether this transform to JavaStream compiles + JavaDStream transformed1 = ssc.transform( + listOfDStreams1, + new Function2>, Time, JavaRDD>() { + public JavaRDD call(List> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 2); + return null; + } + } + ); + + List> listOfDStreams2 = + Arrays.>asList(stream1, stream2, pairStream1.toJavaDStream()); + + JavaPairDStream> transformed2 = ssc.transform( + listOfDStreams2, + new Function2>, Time, JavaPairRDD>>() { + public JavaPairRDD> call(List> listOfRDDs, Time time) { + assert(listOfRDDs.size() == 3); + JavaRDD rdd1 = (JavaRDD)listOfRDDs.get(0); + JavaRDD rdd2 = (JavaRDD)listOfRDDs.get(1); + JavaRDD> rdd3 = (JavaRDD>)listOfRDDs.get(2); + JavaPairRDD prdd3 = JavaPairRDD.fromJavaRDD(rdd3); + PairFunction mapToTuple = new PairFunction() { + @Override + public Tuple2 call(Integer i) throws Exception { + return new Tuple2(i, i); + } + }; + return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed2); + List>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + Assert.assertEquals(expected, result); + } + + @Test public void testFlatMap() { List> inputData = Arrays.asList( Arrays.asList("go", "giants"),