Changed DStream member access permissions from private to protected. Updated StateDStream to checkpoint RDDs and forget lineage.
This commit is contained in:
parent
389a78722c
commit
7c09ad0e04
|
@ -94,7 +94,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
|
|||
|
||||
def getStorageLevel = storageLevel
|
||||
|
||||
def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
|
||||
def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER_2): RDD[T] = {
|
||||
if (!level.useDisk && level.replication < 2) {
|
||||
throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
|
||||
}
|
||||
|
|
|
@ -41,17 +41,17 @@ extends Logging with Serializable {
|
|||
*/
|
||||
|
||||
// Variable to store the RDDs generated earlier in time
|
||||
@transient private val generatedRDDs = new HashMap[Time, RDD[T]] ()
|
||||
@transient protected val generatedRDDs = new HashMap[Time, RDD[T]] ()
|
||||
|
||||
// Variable to be set to the first time seen by the DStream (effective time zero)
|
||||
private[streaming] var zeroTime: Time = null
|
||||
protected[streaming] var zeroTime: Time = null
|
||||
|
||||
// Variable to specify storage level
|
||||
private var storageLevel: StorageLevel = StorageLevel.NONE
|
||||
protected var storageLevel: StorageLevel = StorageLevel.NONE
|
||||
|
||||
// Checkpoint level and checkpoint interval
|
||||
private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
|
||||
private var checkpointInterval: Time = null
|
||||
protected var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
|
||||
protected var checkpointInterval: Time = null
|
||||
|
||||
// Change this RDD's storage level
|
||||
def persist(
|
||||
|
@ -84,7 +84,7 @@ extends Logging with Serializable {
|
|||
* the validity of future times is calculated. This method also recursively initializes
|
||||
* its parent DStreams.
|
||||
*/
|
||||
def initialize(time: Time) {
|
||||
protected[streaming] def initialize(time: Time) {
|
||||
if (zeroTime == null) {
|
||||
zeroTime = time
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ extends Logging with Serializable {
|
|||
}
|
||||
|
||||
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
|
||||
private def isTimeValid (time: Time): Boolean = {
|
||||
protected def isTimeValid (time: Time): Boolean = {
|
||||
if (!isInitialized) {
|
||||
throw new Exception (this.toString + " has not been initialized")
|
||||
} else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
|
||||
|
@ -208,7 +208,7 @@ extends Logging with Serializable {
|
|||
new TransformedDStream(this, ssc.sc.clean(transformFunc))
|
||||
}
|
||||
|
||||
private[streaming] def toQueue = {
|
||||
def toQueue = {
|
||||
val queue = new ArrayBlockingQueue[RDD[T]](10000)
|
||||
this.foreachRDD(rdd => {
|
||||
queue.add(rdd)
|
||||
|
|
|
@ -7,7 +7,7 @@ import scala.collection.mutable.Queue
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class QueueInputDStream[T: ClassManifest](
|
||||
ssc: StreamingContext,
|
||||
@transient ssc: StreamingContext,
|
||||
val queue: Queue[RDD[T]],
|
||||
oneAtATime: Boolean,
|
||||
defaultRDD: RDD[T]
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package spark.streaming
|
||||
|
||||
import spark.RDD
|
||||
import spark.BlockRDD
|
||||
import spark.Partitioner
|
||||
import spark.MapPartitionsRDD
|
||||
import spark.SparkContext._
|
||||
|
||||
import spark.storage.StorageLevel
|
||||
|
||||
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
|
||||
parent: DStream[(K, V)],
|
||||
|
@ -22,6 +23,47 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
|
||||
override def slideTime = parent.slideTime
|
||||
|
||||
override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
|
||||
generatedRDDs.get(time) match {
|
||||
case Some(oldRDD) => {
|
||||
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
|
||||
val r = oldRDD
|
||||
val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
|
||||
val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
|
||||
override val partitioner = oldRDD.partitioner
|
||||
}
|
||||
generatedRDDs.update(time, checkpointedRDD)
|
||||
logInfo("Updated RDD of time " + time + " with its checkpointed version")
|
||||
Some(checkpointedRDD)
|
||||
} else {
|
||||
Some(oldRDD)
|
||||
}
|
||||
}
|
||||
case None => {
|
||||
if (isTimeValid(time)) {
|
||||
compute(time) match {
|
||||
case Some(newRDD) => {
|
||||
if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
|
||||
newRDD.persist(checkpointLevel)
|
||||
logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time)
|
||||
} else if (storageLevel != StorageLevel.NONE) {
|
||||
newRDD.persist(storageLevel)
|
||||
logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time)
|
||||
}
|
||||
generatedRDDs.put(time, newRDD)
|
||||
Some(newRDD)
|
||||
}
|
||||
case None => {
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
|
||||
|
||||
// Try to get the previous state RDD
|
||||
|
@ -29,26 +71,27 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
|
||||
case Some(prevStateRDD) => { // If previous state RDD exists
|
||||
|
||||
// Define the function for the mapPartition operation on cogrouped RDD;
|
||||
// first map the cogrouped tuple to tuples of required type,
|
||||
// and then apply the update function
|
||||
val func = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
|
||||
val i = iterator.map(t => {
|
||||
(t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
|
||||
})
|
||||
updateFunc(i)
|
||||
}
|
||||
|
||||
// Try to get the parent RDD
|
||||
parent.getOrCompute(validTime) match {
|
||||
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
|
||||
|
||||
// Define the function for the mapPartition operation on cogrouped RDD;
|
||||
// first map the cogrouped tuple to tuples of required type,
|
||||
// and then apply the update function
|
||||
val updateFuncLocal = updateFunc
|
||||
val mapPartitionFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
|
||||
val i = iterator.map(t => {
|
||||
(t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
|
||||
})
|
||||
updateFuncLocal(i)
|
||||
}
|
||||
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
|
||||
val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, func)
|
||||
logDebug("Generating state RDD for time " + validTime)
|
||||
val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, mapPartitionFunc)
|
||||
//logDebug("Generating state RDD for time " + validTime)
|
||||
return Some(stateRDD)
|
||||
}
|
||||
case None => { // If parent RDD does not exist, then return old state RDD
|
||||
logDebug("Generating state RDD for time " + validTime + " (no change)")
|
||||
//logDebug("Generating state RDD for time " + validTime + " (no change)")
|
||||
return Some(prevStateRDD)
|
||||
}
|
||||
}
|
||||
|
@ -56,23 +99,25 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
|
||||
case None => { // If previous session RDD does not exist (first input data)
|
||||
|
||||
// Define the function for the mapPartition operation on grouped RDD;
|
||||
// first map the grouped tuple to tuples of required type,
|
||||
// and then apply the update function
|
||||
val func = (iterator: Iterator[(K, Seq[V])]) => {
|
||||
updateFunc(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
|
||||
}
|
||||
|
||||
// Try to get the parent RDD
|
||||
parent.getOrCompute(validTime) match {
|
||||
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
|
||||
|
||||
// Define the function for the mapPartition operation on grouped RDD;
|
||||
// first map the grouped tuple to tuples of required type,
|
||||
// and then apply the update function
|
||||
val updateFuncLocal = updateFunc
|
||||
val mapPartitionFunc = (iterator: Iterator[(K, Seq[V])]) => {
|
||||
updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
|
||||
}
|
||||
|
||||
val groupedRDD = parentRDD.groupByKey(partitioner)
|
||||
val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, func)
|
||||
logDebug("Generating state RDD for time " + validTime + " (first)")
|
||||
val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, mapPartitionFunc)
|
||||
//logDebug("Generating state RDD for time " + validTime + " (first)")
|
||||
return Some(sessionRDD)
|
||||
}
|
||||
case None => { // If parent RDD does not exist, then nothing to do!
|
||||
logDebug("Not generating state RDD (no previous state, no parent)")
|
||||
//logDebug("Not generating state RDD (no previous state, no parent)")
|
||||
return None
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,12 +107,12 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
|
|||
Seq(("a", 3), ("b", 3), ("c", 3))
|
||||
)
|
||||
|
||||
val updateStateOp =(s: DStream[String]) => {
|
||||
val updateStateOp = (s: DStream[String]) => {
|
||||
val updateFunc = (values: Seq[Int], state: RichInt) => {
|
||||
var newState = 0
|
||||
if (values != null) newState += values.reduce(_ + _)
|
||||
if (state != null) newState += state.self
|
||||
//println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
|
||||
println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
|
||||
new RichInt(newState)
|
||||
}
|
||||
s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
|
||||
|
|
Loading…
Reference in a new issue