From 82b8707c6bbb3926e59c241b6e6d5ead5467aae7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 17 Jan 2013 12:21:44 -0800 Subject: [PATCH] Checkpointing in Streaming java API --- .../api/java/JavaStreamingContext.scala | 19 +++++- streaming/src/test/scala/JavaTestUtils.scala | 9 ++- .../scala/spark/streaming/JavaAPISuite.java | 68 +++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 2833793b94..ebbc516b38 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -13,14 +13,29 @@ import java.io.InputStream import java.util.{Map => JMap} class JavaStreamingContext(val ssc: StreamingContext) { - def this(master: String, frameworkName: String, batchDuration: Duration) = - this(new StreamingContext(master, frameworkName, batchDuration)) // TODOs: // - Test StreamingContext functions // - Test to/from Hadoop functions // - Support creating and registering InputStreams + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param frameworkName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Duration) = + this(new StreamingContext(master, frameworkName, batchDuration)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this (new StreamingContext(path)) + /** * Create an input stream that pulls messages form a Kafka Broker. * @param hostname Zookeper hostname. diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala index 24ebc15e38..56349837e5 100644 --- a/streaming/src/test/scala/JavaTestUtils.scala +++ b/streaming/src/test/scala/JavaTestUtils.scala @@ -8,7 +8,7 @@ import java.util.ArrayList import collection.JavaConversions._ /** Exposes streaming test functionality in a Java-friendly way. */ -object JavaTestUtils extends TestSuiteBase { +trait JavaTestBase extends TestSuiteBase { /** * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. @@ -56,3 +56,10 @@ object JavaTestUtils extends TestSuiteBase { } } +object JavaTestUtils extends JavaTestBase { + +} + +object JavaCheckpointTestUtils extends JavaTestBase { + override def actuallyWait = true +} \ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 41fd9f99ff..8a63e8cd3f 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -3,6 +3,7 @@ package spark.streaming; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.Files; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; @@ -17,6 +18,7 @@ import spark.streaming.api.java.JavaDStream; import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import sun.org.mozilla.javascript.annotations.JSFunction; @@ -871,6 +873,72 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List> initialResult = JavaTestUtils.runStreams(sc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + sc.stop(); + sc = new JavaStreamingContext(tempDir.getAbsolutePath()); + sc.start(); + List> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests.