Adding some initial tests to streaming API.

This commit is contained in:
Patrick Wendell 2013-01-04 11:19:20 -08:00
parent b607c9e916
commit 867a7455e2
3 changed files with 79 additions and 3 deletions

View file

@ -1,6 +1,7 @@
package spark.streaming.api.java
import scala.collection.JavaConversions._
import java.util.{List => JList}
import spark.streaming._
import dstream.SparkFlumeEvent

View file

@ -0,0 +1,42 @@
package spark.streaming
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import java.util.{List => JList}
import spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
import spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
/** Exposes core test functionality in a Java-friendly way. */
object JavaTestUtils extends TestSuiteBase {
def attachTestInputStream[T](ssc: JavaStreamingContext,
data: JList[JList[T]], numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
}
def attachTestOutputStream[T](dstream: JavaDStream[T]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream)
}
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
out
}
}

View file

@ -1,14 +1,20 @@
package spark.streaming;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import spark.api.java.JavaRDD;
import spark.api.java.function.Function;
import spark.api.java.function.Function2;
import spark.streaming.JavaTestUtils;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@ -30,9 +36,36 @@ public class JavaAPISuite implements Serializable {
}
@Test
public void simpleTest() {
sc.textFileStream("/var/log/syslog").print();
sc.start();
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3,4), Arrays.asList(3,4,5), Arrays.asList(3));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
Assert.assertTrue(result.equals(
Arrays.asList(Arrays.asList(4), Arrays.asList(3), Arrays.asList(1))));
}
@Test
public void testMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon"));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
});
JavaTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
Assert.assertTrue(result.equals(
Arrays.asList(Arrays.asList(5, 5), Arrays.asList(9, 4))));
}
public static void main(String[] args) {