Checkpointing in Streaming java API

This commit is contained in:
Patrick Wendell 2013-01-17 12:21:44 -08:00
parent 61b877c688
commit 82b8707c6b
3 changed files with 93 additions and 3 deletions

View file

@ -13,14 +13,29 @@ import java.io.InputStream
import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
// - Support creating and registering InputStreams
/**
* Creates a StreamingContext.
* @param master Name of the Spark Master
* @param frameworkName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
/**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
def this(path: String) = this (new StreamingContext(path))
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.

View file

@ -8,7 +8,7 @@ import java.util.ArrayList
import collection.JavaConversions._
/** Exposes streaming test functionality in a Java-friendly way. */
object JavaTestUtils extends TestSuiteBase {
trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
@ -56,3 +56,10 @@ object JavaTestUtils extends TestSuiteBase {
}
}
object JavaTestUtils extends JavaTestBase {
}
object JavaCheckpointTestUtils extends JavaTestBase {
override def actuallyWait = true
}

View file

@ -3,6 +3,7 @@ package spark.streaming;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
@ -17,6 +18,7 @@ import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import sun.org.mozilla.javascript.annotations.JSFunction;
@ -871,6 +873,72 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("this", "is"),
Arrays.asList("a", "test"),
Arrays.asList("counting", "letters"));
List<List<Integer>> expectedInitial = Arrays.asList(
Arrays.asList(4,2));
List<List<Integer>> expectedFinal = Arrays.asList(
Arrays.asList(1,4),
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
});
JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1);
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
sc.stop();
sc = new JavaStreamingContext(tempDir.getAbsolutePath());
sc.start();
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2);
assertOrderInvariantEquals(expectedFinal, finalResult);
}
/** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("this", "is"),
Arrays.asList("a", "test"),
Arrays.asList("counting", "letters"));
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(4,2),
Arrays.asList(1,4),
Arrays.asList(8,7));
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
});
JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
letterCount.checkpoint(new Duration(1000));
List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3);
assertOrderInvariantEquals(expected, result1);
}
*/
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.