More work on StreamingContext

This commit is contained in:
Patrick Wendell 2013-01-10 19:29:22 -08:00
parent 5004eec37c
commit b36c4f7cce

View file

@ -9,6 +9,7 @@ import spark.storage.StorageLevel
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
def this(master: String, frameworkName: String, batchDuration: Time) =
@ -17,10 +18,31 @@ class JavaStreamingContext(val ssc: StreamingContext) {
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
// - Add checkpoint()/remember()
// - Support creating your own streams
// - Support registering InputStreams
// - Add Kafka Stream
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def kafkaStream[T](
hostname: String,
port: Int,
groupId: String,
topics: JMap[String, Int])
: DStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream(hostname, port, groupId, Map(topics.toSeq: _*))
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
@ -162,6 +184,27 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.registerOutputStream(outputStream.dstream)
}
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
def checkpoint(directory: String, interval: Time = null) {
ssc.checkpoint(directory, interval)
}
/**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
def remember(duration: Time) {
ssc.remember(duration)
}
/**
* Starts the execution of the streams.
*/