Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.

This commit is contained in:
Tathagata Das 2014-01-12 19:02:27 -08:00
parent 74d0126257
commit 034f89aaab
9 changed files with 41 additions and 10 deletions

View file

@ -43,6 +43,7 @@ object FlumeUtils {
/**
* Creates a input stream from a Flume source.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/

View file

@ -78,6 +78,7 @@ object KafkaUtils {
/**
* Create an input stream that pulls messages form a Kafka Broker.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
@ -128,7 +129,7 @@ object KafkaUtils {
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
* @param storageLevel RDD storage level.
*/
def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,

View file

@ -44,6 +44,7 @@ object MQTTUtils {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to

View file

@ -51,6 +51,7 @@ object TwitterUtils {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
@ -62,6 +63,7 @@ object TwitterUtils {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
@ -88,6 +90,7 @@ object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
@ -97,6 +100,7 @@ object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them

View file

@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
throw new Exception("Remember duration already set as " + batchDuration +
". cannot set it again.")
}
rememberDuration = duration

View file

@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
}
/**
* Set the context to periodically checkpoint the DStream operations for master
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS for
@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T: ClassTag](

View file

@ -151,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
@ -161,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
@ -302,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param props Props object defining creation of the actor
* @param name Name of the actor
*

View file

@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
extends DStream[T](parent.ssc) {
if (!_windowDuration.isMultipleOf(parent.slideDuration))
throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
if (!_slideDuration.isMultipleOf(parent.slideDuration))
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
override def parentRememberDuration: Duration = rememberDuration + windowDuration
override def persist(level: StorageLevel): DStream[T] = {
// Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
// RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
// Instead control the persistence of the parent DStream.
parent.persist(level)
this
}
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)

View file

@ -19,6 +19,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
@ -144,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(3)
)
test("window - persistence level") {
val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
val ssc = new StreamingContext(conf, batchDuration)
val inputStream = new TestInputStream[Int](ssc, input, 1)
val windowStream1 = inputStream.window(batchDuration * 2)
assert(windowStream1.storageLevel === StorageLevel.NONE)
assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
windowStream1.persist(StorageLevel.MEMORY_ONLY)
assert(windowStream1.storageLevel === StorageLevel.NONE)
assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
ssc.stop()
}
// Testing naive reduceByKeyAndWindow (without invertible function)
testReduceByKeyAndWindow(