Merge pull request #481 from pwendell/stream-rdd-type-streaming
STREAMING-51: Add RDD type as a type parameter in JavaDStreamLike Edit (streaming/ version)
This commit is contained in:
commit
b0565ae396
|
@ -4,6 +4,7 @@ import spark.streaming.{Duration, Time, DStream}
|
|||
import spark.api.java.function.{Function => JFunction}
|
||||
import spark.api.java.JavaRDD
|
||||
import spark.storage.StorageLevel
|
||||
import spark.RDD
|
||||
|
||||
/**
|
||||
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
|
||||
|
@ -26,7 +27,9 @@ import spark.storage.StorageLevel
|
|||
* - A function that is used to generate an RDD after each time interval
|
||||
*/
|
||||
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
|
||||
extends JavaDStreamLike[T, JavaDStream[T]] {
|
||||
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
|
||||
|
||||
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
|
||||
|
||||
/** Return a new DStream containing only the elements that satisfy a predicate. */
|
||||
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
|
||||
|
|
|
@ -6,17 +6,20 @@ import java.lang.{Long => JLong}
|
|||
import scala.collection.JavaConversions._
|
||||
|
||||
import spark.streaming._
|
||||
import spark.api.java.JavaRDD
|
||||
import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
|
||||
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
|
||||
import java.util
|
||||
import spark.RDD
|
||||
import JavaDStream._
|
||||
|
||||
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
|
||||
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
|
||||
extends Serializable {
|
||||
implicit val classManifest: ClassManifest[T]
|
||||
|
||||
def dstream: DStream[T]
|
||||
|
||||
def wrapRDD(in: RDD[T]): R
|
||||
|
||||
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
|
||||
in.map(new JLong(_))
|
||||
}
|
||||
|
@ -212,35 +215,35 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
|
|||
/**
|
||||
* Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
|
||||
*/
|
||||
def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
|
||||
new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
|
||||
def slice(fromTime: Time, toTime: Time): JList[R] = {
|
||||
new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq)
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a function to each RDD in this DStream. This is an output operator, so
|
||||
* this DStream will be registered as an output stream and therefore materialized.
|
||||
*/
|
||||
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
|
||||
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
|
||||
def foreach(foreachFunc: JFunction[R, Void]) {
|
||||
dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a function to each RDD in this DStream. This is an output operator, so
|
||||
* this DStream will be registered as an output stream and therefore materialized.
|
||||
*/
|
||||
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
|
||||
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
|
||||
def foreach(foreachFunc: JFunction2[R, Time, Void]) {
|
||||
dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new DStream in which each RDD is generated by applying a function
|
||||
* on each RDD of this DStream.
|
||||
*/
|
||||
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
|
||||
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
|
||||
implicit val cm: ClassManifest[U] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
|
||||
def scalaTransform (in: RDD[T]): RDD[U] =
|
||||
transformFunc.call(new JavaRDD[T](in)).rdd
|
||||
transformFunc.call(wrapRDD(in)).rdd
|
||||
dstream.transform(scalaTransform(_))
|
||||
}
|
||||
|
||||
|
@ -248,11 +251,41 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
|
|||
* Return a new DStream in which each RDD is generated by applying a function
|
||||
* on each RDD of this DStream.
|
||||
*/
|
||||
def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
|
||||
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
|
||||
implicit val cm: ClassManifest[U] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
|
||||
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
|
||||
transformFunc.call(new JavaRDD[T](in), time).rdd
|
||||
transformFunc.call(wrapRDD(in), time).rdd
|
||||
dstream.transform(scalaTransform(_, _))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new DStream in which each RDD is generated by applying a function
|
||||
* on each RDD of this DStream.
|
||||
*/
|
||||
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
|
||||
JavaPairDStream[K2, V2] = {
|
||||
implicit val cmk: ClassManifest[K2] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
|
||||
implicit val cmv: ClassManifest[V2] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
|
||||
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
|
||||
transformFunc.call(wrapRDD(in)).rdd
|
||||
dstream.transform(scalaTransform(_))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new DStream in which each RDD is generated by applying a function
|
||||
* on each RDD of this DStream.
|
||||
*/
|
||||
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
|
||||
JavaPairDStream[K2, V2] = {
|
||||
implicit val cmk: ClassManifest[K2] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
|
||||
implicit val cmv: ClassManifest[V2] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
|
||||
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
|
||||
transformFunc.call(wrapRDD(in), time).rdd
|
||||
dstream.transform(scalaTransform(_, _))
|
||||
}
|
||||
|
||||
|
|
|
@ -15,11 +15,14 @@ import org.apache.hadoop.conf.Configuration
|
|||
import spark.api.java.JavaPairRDD
|
||||
import spark.storage.StorageLevel
|
||||
import com.google.common.base.Optional
|
||||
import spark.RDD
|
||||
|
||||
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
|
||||
implicit val kManifiest: ClassManifest[K],
|
||||
implicit val vManifest: ClassManifest[V])
|
||||
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
|
||||
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
|
||||
|
||||
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
|
||||
|
||||
// =======================================================================
|
||||
// Methods common to all DStream's
|
||||
|
|
|
@ -254,7 +254,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
|||
/**
|
||||
* Registers an output stream that will be computed every interval
|
||||
*/
|
||||
def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
|
||||
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
|
||||
ssc.registerOutputStream(outputStream.dstream)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@ import org.junit.Test;
|
|||
import scala.Tuple2;
|
||||
import spark.HashPartitioner;
|
||||
import spark.api.java.JavaRDD;
|
||||
import spark.api.java.JavaRDDLike;
|
||||
import spark.api.java.JavaPairRDD;
|
||||
import spark.api.java.JavaSparkContext;
|
||||
import spark.api.java.function.*;
|
||||
import spark.storage.StorageLevel;
|
||||
|
@ -293,8 +295,9 @@ public class JavaAPISuite implements Serializable {
|
|||
Arrays.asList(6,7,8),
|
||||
Arrays.asList(9,10,11));
|
||||
|
||||
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
|
||||
JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
|
||||
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
|
||||
JavaDStream<Integer> transformed =
|
||||
stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
|
||||
@Override
|
||||
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
|
||||
return in.map(new Function<Integer, Integer>() {
|
||||
|
@ -741,6 +744,90 @@ public class JavaAPISuite implements Serializable {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPairTransform() {
|
||||
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(1, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5),
|
||||
new Tuple2<Integer, Integer>(2, 5)),
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(2, 5),
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5),
|
||||
new Tuple2<Integer, Integer>(1, 5)));
|
||||
|
||||
List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(1, 5),
|
||||
new Tuple2<Integer, Integer>(2, 5),
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5)),
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(1, 5),
|
||||
new Tuple2<Integer, Integer>(2, 5),
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5)));
|
||||
|
||||
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
|
||||
ssc, inputData, 1);
|
||||
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
|
||||
|
||||
JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
|
||||
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
|
||||
@Override
|
||||
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
|
||||
return in.sortByKey();
|
||||
}
|
||||
});
|
||||
|
||||
JavaTestUtils.attachTestOutputStream(sorted);
|
||||
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
|
||||
|
||||
Assert.assertEquals(expected, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPairToNormalRDDTransform() {
|
||||
List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(1, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5),
|
||||
new Tuple2<Integer, Integer>(2, 5)),
|
||||
Arrays.asList(
|
||||
new Tuple2<Integer, Integer>(2, 5),
|
||||
new Tuple2<Integer, Integer>(3, 5),
|
||||
new Tuple2<Integer, Integer>(4, 5),
|
||||
new Tuple2<Integer, Integer>(1, 5)));
|
||||
|
||||
List<List<Integer>> expected = Arrays.asList(
|
||||
Arrays.asList(3,1,4,2),
|
||||
Arrays.asList(2,3,4,1));
|
||||
|
||||
JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
|
||||
ssc, inputData, 1);
|
||||
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
|
||||
|
||||
JavaDStream<Integer> firstParts = pairStream.transform(
|
||||
new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
|
||||
@Override
|
||||
public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
|
||||
return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
|
||||
@Override
|
||||
public Integer call(Tuple2<Integer, Integer> in) {
|
||||
return in._1();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
JavaTestUtils.attachTestOutputStream(firstParts);
|
||||
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
|
||||
|
||||
Assert.assertEquals(expected, result);
|
||||
}
|
||||
|
||||
public void testMapValues() {
|
||||
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
|
||||
|
||||
|
|
|
@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase {
|
|||
* Attach a provided stream to it's associated StreamingContext as a
|
||||
* [[spark.streaming.TestOutputStream]].
|
||||
**/
|
||||
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
|
||||
dstream: JavaDStreamLike[T, This]) = {
|
||||
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
|
||||
R <: spark.api.java.JavaRDDLike[T, R]](
|
||||
dstream: JavaDStreamLike[T, This, R]) = {
|
||||
implicit val cm: ClassManifest[T] =
|
||||
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
|
||||
val ostream = new TestOutputStream(dstream.dstream,
|
||||
|
|
Loading…
Reference in a new issue